package com.tpvlog.dfs.backupnode.log;

import com.tpvlog.dfs.backupnode.BackupNode;
import com.tpvlog.dfs.backupnode.file.FSNameSystem;
import com.tpvlog.dfs.backupnode.server.NameNodeRpcClient;

import java.io.File;
import java.io.FileOutputStream;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

/**
 * 生成fsimage快照文件的线程
 *
 * @author Ressmix
 */
public class FSImageCheckPointer extends Thread {
    private static final Integer CHECKPOINT_INTERVAL = 60 * 60 * 1000;

    private BackupNode backupNode;
    private FSNameSystem namesystem;
    private NameNodeRpcClient rpcClient;

    public FSImageCheckPointer(BackupNode backupNode, FSNameSystem namesystem, NameNodeRpcClient rpcClient) {
        this.backupNode = backupNode;
        this.namesystem = namesystem;
        this.rpcClient = rpcClient;
    }

    @Override
    public void run() {
        System.out.println("fsimage checkpoint定时调度线程启动......");
        while (backupNode.isRunning()) {
            try {
                // 1.如果BackupNode正在进行元数据恢复，则等待其完成
                if (!namesystem.isFinishedRecover()) {
                    System.out.println("当前还没完成元数据恢复，不进行checkpoint......");
                    Thread.sleep(1000);
                    continue;
                }

                // 2.判断是否可以进行checkpoint
                long now = System.currentTimeMillis();
                long checkpointTime = namesystem.getCheckPoint().getCheckpointTime();
                if (now - checkpointTime > CHECKPOINT_INTERVAL) {
                    if (!rpcClient.getRunning()) {
                        System.out.println("namenode当前无法访问，不执行checkpoint......");
                        continue;
                    }
                    System.out.println("准备执行checkpoint操作......");
                    // 触发checkpoint
                    doCheckpoint();
                    System.out.println("完成checkpoint操作......");
                }
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 执行checkpoint机制
     */
    private void doCheckpoint() throws Exception {
        // 1.生成fsimage
        FSImage fsimage = namesystem.getFSImage();
        // 2.删除上一次的fsimage文件
        removeLastFSImageFile();
        // 3.保存fsimage文件到磁盘
        writeFSImageFile(fsimage);
        // 4.将fsimage文件发送给NameNode
        uploadFSImageFile(fsimage);
        // 5.将checkpoint信息发送给NameNode
        updateCheckpointTxid(fsimage);
        // 6.持久化checkpoint信息
        saveCheckpointInfo(fsimage);
    }

    /**
     * 持久化checkpoint信息
     */
    private void saveCheckpointInfo(FSImage fsimage) {
        String path = "C:\\Users\\Ressmix\\Desktop\\backupnode\\checkpoint-info.meta";

        RandomAccessFile raf = null;
        FileOutputStream out = null;
        FileChannel channel = null;

        try {
            File file = new File(path);
            if (file.exists()) {
                file.delete();
            }

            long time = namesystem.getCheckPoint().getCheckpointTime();
            long checkpointTxid = namesystem.getCheckPoint().getSyncedTxid();
            String lastFsimageFile = namesystem.getCheckPoint().getFsimageFile();
            ByteBuffer buffer = ByteBuffer.wrap(String.valueOf(time + "_" + checkpointTxid + "_" + lastFsimageFile).getBytes());

            raf = new RandomAccessFile(path, "rw");
            out = new FileOutputStream(raf.getFD());
            channel = out.getChannel();
            channel.write(buffer);
            channel.force(false);
            System.out.println("checkpoint信息持久化到磁盘文件......");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (out != null) {
                    out.close();
                }
                if (raf != null) {
                    raf.close();
                }
                if (channel != null) {
                    channel.close();
                }
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
    }

    /**
     * 发送checkpoint信息
     */
    private void updateCheckpointTxid(FSImage fsimage) {
        rpcClient.updateCheckpointTxid(fsimage.getMaxTxid());
    }

    /**
     * 删除上一个fsimage磁盘文件
     */
    private void removeLastFSImageFile() {
        String lastFsimageFile = namesystem.getCheckPoint().getFsimageFile();
        File file = new File(lastFsimageFile);
        if (file.exists()) {
            file.delete();
        }
    }

    /**
     * 写入最新的fsimage文件
     *
     * @throws Exception
     */
    private void writeFSImageFile(FSImage fsimage) throws Exception {
        ByteBuffer buffer = ByteBuffer.wrap(fsimage.getFsimageJson().getBytes());

        // fsimage文件名
        String filename = "fsimage-" + fsimage.getMaxTxid() + ".meta";
        String fsimageFilePath = "C:\\Users\\Ressmix\\Desktop\\backupnode\\" + filename;
        RandomAccessFile file = null;
        FileOutputStream out = null;
        FileChannel channel = null;
        try {
            file = new RandomAccessFile(fsimageFilePath, "rw");
            out = new FileOutputStream(file.getFD());
            channel = out.getChannel();
            channel.write(buffer);
            channel.force(false);  // 强制把数据刷入磁盘上
        } finally {
            if (out != null) {
                out.close();
            }
            if (file != null) {
                file.close();
            }
            if (channel != null) {
                channel.close();
            }
        }
        // 更新checkpoint信息
        namesystem.getCheckPoint().setFsimageFile(filename);
        namesystem.getCheckPoint().setCheckpointTime(System.currentTimeMillis());
        namesystem.getCheckPoint().setSyncedTxid(fsimage.getMaxTxid());
    }

    /**
     * 发送fsimage文件
     */
    private void uploadFSImageFile(FSImage fsimage)  {
        FSImageUploader fsimageUploader = new FSImageUploader(fsimage);
        fsimageUploader.start();
    }
}
